Build initial portfolio type handlers for datamanager#633
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a new portfolio routes module and wires it into the datamanager app router. Implements POST/GET /portfolio using Axum, in-memory Parquet -> S3 upload, and DuckDB httpfs reads from S3 with AWS credentials. Also renames two prediction handler functions and updates router registrations. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant App as Datamanager App Router
participant PR as Portfolio Router
participant H as Handlers
participant S3 as Amazon S3
participant AK as AWS Config
participant DB as DuckDB (httpfs)
rect rgba(230,245,255,0.6)
note over C,App: POST /portfolio
C->>App: POST /portfolio { DataFrame, timestamp }
App->>PR: route to portfolio
PR->>H: save_portfolio(payload)
H->>H: DataFrame -> Parquet (in-memory)
H->>S3: PUT parquet (key YYYY/MM/DD/...)
S3-->>H: 200 + s3_key
H-->>C: 200 { rows, s3_key }
end
rect rgba(240,255,230,0.6)
note over C,App: GET /portfolio
C->>App: GET /portfolio?date=...
App->>PR: route to portfolio
PR->>H: get_portfolio(params)
H->>AK: load AWS creds
AK-->>H: creds
H->>DB: CONFIGURE httpfs with creds
H->>DB: SELECT ... FROM s3://.../parquet
DB-->>H: rows/DataFrame
H-->>C: 200 { data }
end
rect rgba(255,240,230,0.6)
note over H: On failure
H-->>C: 500 { error }
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull Request Overview
This PR adds basic portfolio management functionality to the datamanager application by implementing save and query operations for portfolio data. The changes establish portfolio type handlers similar to the existing prediction functionality.
- Adds portfolio data structure and handlers for saving/querying portfolio information
- Updates prediction routes to use plural naming conventions for consistency
- Integrates portfolio routes into the main application router
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| applications/datamanager/src/routes/prediction.rs | Renames functions to use plural forms for consistency |
| applications/datamanager/src/routes/portfolio.rs | Implements new portfolio handlers with save/query functionality |
| applications/datamanager/src/routes/mod.rs | Exports the new portfolio module |
| applications/datamanager/src/lib.rs | Integrates portfolio router into the main application |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
applications/datamanager/src/routes/prediction.rs (2)
78-86: Return 500 on upload failure (not 200).Failure path currently responds 200 OK, masking errors to clients.
Apply:
- ( - StatusCode::OK, - format!("S3 upload failed: {}\n\n{}", err, json_output), - ) + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("S3 upload failed: {}\n\n{}", err, json_output), + )
154-161: Content-Type says JSON but body is not JSON.
DataFrame::to_string()is plain text. Either serialize to JSON or set text/plain.Minimal fix:
- .insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + .insert(header::CONTENT_TYPE, "text/plain; charset=utf-8".parse().unwrap());If you want JSON, switch to a proper JSON serializer for
DataFrameand keepapplication/json.
🧹 Nitpick comments (7)
applications/datamanager/src/routes/prediction.rs (3)
146-151: Guard empty requests to avoid invalid SQL.
WHERE ticker IN ()is invalid whenpositionsis empty.Add early return:
async fn query_predictions( State(state): State<AppState>, Json(payload): Json<QueryPredictionsPayload>, ) -> impl IntoResponse { info!("Fetching predictions from S3"); + if payload.positions.is_empty() { + let mut response = Response::new(Body::from("[]")); + response + .headers_mut() + .insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + *response.status_mut() = StatusCode::OK; + return response; + }
233-238: Avoid naive string interpolation for tickers.Directly embedding values risks SQL syntax issues and future injection bugs. Prefer a VALUES list joined on
ticker, or at least escape single quotes in inputs.
131-132: Consider a more specific Parquet content type.
application/x-parquetis more precise thanapplication/octet-stream.- .content_type("application/octet-stream") + .content_type("application/x-parquet")applications/datamanager/src/routes/portfolio.rs (4)
97-101: Use tracing instead of println for structured logs.Aligns with the rest of the module and preserves log metadata.
- println!( - "DataFrame successfully converted to parquet, size: {} bytes", - buffer.len() - ); + debug!( + "DataFrame successfully converted to parquet, size: {} bytes", + buffer.len() + );
131-147: Allow querying by timestamp (don’t hardcode Utc::now).Using “now” will often miss data. Accept a query param (e.g.,
?ts=...) or payload with date; default to latest only if not provided.Happy to sketch the Axum extractor and parsing if you want it in this PR.
116-116: Parquet content type (optional).Consider
application/x-parquetinstead ofapplication/octet-stream.- .content_type("application/octet-stream") + .content_type("application/x-parquet")
75-129: DRY: Share S3 Parquet upload helper with predictions.
upload_dataframe_to_s3is nearly identical across modules. Centralize to avoid drift.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
applications/datamanager/src/lib.rs(2 hunks)applications/datamanager/src/routes/mod.rs(1 hunks)applications/datamanager/src/routes/portfolio.rs(1 hunks)applications/datamanager/src/routes/prediction.rs(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
applications/datamanager/src/lib.rs (1)
applications/datamanager/src/routes/portfolio.rs (1)
router(231-235)
applications/datamanager/src/routes/portfolio.rs (1)
applications/datamanager/src/routes/prediction.rs (3)
upload_dataframe_to_s3(90-144)query_s3_parquet_data(173-280)router(282-286)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Run Python code checks
- GitHub Check: Run Rust code checks
- GitHub Check: Run Python code checks
- GitHub Check: Run Rust code checks
🔇 Additional comments (3)
applications/datamanager/src/routes/prediction.rs (1)
283-286: Router mapping rename looks good.Handlers and routes align with the new names.
applications/datamanager/src/lib.rs (1)
74-75: Portfolio router successfully merged.Composition order looks fine and state layering is correct.
applications/datamanager/src/routes/mod.rs (1)
3-3: Module export added correctly.
pub mod portfolio;exposes the new routes module as intended.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (3)
applications/datamanager/src/routes/portfolio.rs (3)
86-90: Path now matches write/read location — good fix.S3 key uses equity/portfolios/... and aligns with the GET path. This resolves the earlier mismatch with predictions.
194-207: Valid DuckDB Parquet reference — good.Using SELECT * FROM 's3://...parquet' fixes the invalid FROM clause pattern flagged earlier.
222-231: Don’t drop row mapping errors; collect and propagate.filter_map(Result::ok) silently discards failures. Collect into Result<Vec<_>, _> and bubble errors; also avoid cloning Strings when building the DataFrame.
Apply:
- let portfolios: Vec<Portfolio> = portfolios_iterator.filter_map(Result::ok).collect(); - - df!( - "ticker" => portfolios.iter().map(|p| p.ticker.clone()).collect::<Vec<_>>(), - "timestamp" => portfolios.iter().map(|p| p.timestamp).collect::<Vec<_>>(), - "side" => portfolios.iter().map(|p| p.side.clone()).collect::<Vec<_>>(), - "dollar_amount" => portfolios.iter().map(|p| p.dollar_amount).collect::<Vec<_>>(), - ) + let portfolios: Vec<Portfolio> = portfolios_iterator + .collect::<Result<Vec<_>, _>>() + .map_err(|e| Error::Other(format!("Failed to collect portfolio rows: {}", e)))?; + + df!( + "ticker" => portfolios.iter().map(|p| p.ticker.as_str()).collect::<Vec<_>>(), + "timestamp" => portfolios.iter().map(|p| p.timestamp).collect::<Vec<_>>(), + "side" => portfolios.iter().map(|p| p.side.as_str()).collect::<Vec<_>>(), + "dollar_amount" => portfolios.iter().map(|p| p.dollar_amount).collect::<Vec<_>>(), + )
🧹 Nitpick comments (7)
applications/datamanager/src/routes/portfolio.rs (7)
63-71: Avoid echoing entire DataFrame in 500 responses.Large payloads in error messages can bloat responses and leak data. Return shape only and log at error level.
Apply:
- Err(err) => { - info!("Failed to upload to S3: {}", err); - let json_output = portfolio.to_string(); - - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("S3 upload failed: {}\n\n{}", err, json_output), - ) - } + Err(err) => { + error!("Failed to upload portfolio to S3: {}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!( + "S3 upload failed: {} (rows={}, cols={})", + err, + portfolio.height(), + portfolio.width() + ), + ) + }Also update tracing import:
- use tracing::{debug, info}; + use tracing::{debug, info, error};
139-145: Use error! for failure path.Log GET failures at error level.
Apply:
- Err(err) => { - info!("Failed to query S3 parquet data: {}", err); + Err(err) => { + error!("Failed to query S3 parquet data: {}", err); ( StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch portfolio: {}", err), ) }And ensure error is imported:
- use tracing::{debug, info}; + use tracing::{debug, info, error};
97-101: Use tracing instead of println!.Prefer debug! to keep logs structured and adjustable by level.
Apply:
- println!( - "DataFrame successfully converted to parquet, size: {} bytes", - buffer.len() - ); + debug!( + "DataFrame successfully converted to parquet, size: {} bytes", + buffer.len() + );
116-116: Set a Parquet-specific content type.application/vnd.apache.parquet is clearer than octet-stream.
Apply:
- .content_type("application/octet-stream") + .content_type("application/vnd.apache.parquet")
131-139: GET should accept a date; Utc::now() rarely matches uploaded partition.Make timestamp optional via query string and default to now.
Apply:
-async fn get_portfolio(State(state): State<AppState>) -> impl IntoResponse { +async fn get_portfolio( + State(state): State<AppState>, + Query(params): Query<PortfolioQuery>, +) -> impl IntoResponse { info!("Fetching portfolio from S3"); - match query_s3_parquet_data(&state, &Utc::now()).await { + let ts = params.timestamp.unwrap_or_else(Utc::now); + match query_s3_parquet_data(&state, &ts).await { Ok(df) => { let json_output = df.to_string(); (StatusCode::OK, json_output) } Err(err) => {Add the helper type and import (outside this hunk):
use axum::extract::Query; #[derive(Deserialize)] struct PortfolioQuery { #[serde(default)] timestamp: Option<DateTime<Utc>>, }
75-129: Shared helper duplication with prediction.rs.upload_dataframe_to_s3 mirrors prediction.rs; consider extracting a reusable util (e.g., s3::parquet::upload_dataframe(state, df, key_prefix)).
I can sketch a small crate-local module with generic key builders for equity/{predictions|portfolios}/daily/... and a single upload function.
18-18: Import error to match new error-level logs.Apply:
- use tracing::{debug, info}; + use tracing::{debug, info, error};
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
applications/datamanager/src/routes/portfolio.rs(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
applications/datamanager/src/routes/portfolio.rs (1)
applications/datamanager/src/routes/prediction.rs (2)
upload_dataframe_to_s3(90-144)query_s3_parquet_data(173-280)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Run Rust code checks
- GitHub Check: Run Python code checks
- GitHub Check: Run Rust code checks
- GitHub Check: Run Python code checks
🔇 Additional comments (1)
applications/datamanager/src/routes/portfolio.rs (1)
30-34: Confirm serde support for DataFrame payloads.
applications/datamanager/Cargo.toml enables polars' "serde" feature (polars = { version = "0.50.0", features = [..., "serde", ...] }); chrono also has serde enabled — the SavePortfolioPayload serde usage is covered.
Overview
Changes
PortfoliotypeComments
This is going to look really similar to everything in
prediction.rsandequity.rsbut I'm going to cut a separate pull request and do some refactoring.Summary by CodeRabbit
New Features
Refactor